// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use anyhow::{anyhow, bail, ensure, Result};
use async_trait::async_trait;
use futures::{future, lock::Mutex};
use k8s_openapi::api::core::v1::Pod;
use kube::{api::ListParams, Api, Client};
use linera_base::{
    command::{resolve_binary, CommandExt},
    data_types::Amount,
};
use linera_client::client_options::ResourceControlPolicyConfig;
use tokio::{process::Command, task::JoinSet};

use crate::cli_wrappers::{
    docker::{BuildArg, DockerImage, Dockerfile},
    helmfile::{HelmFile, DEFAULT_BLOCK_EXPORTER_PORT},
    kind::KindCluster,
    kubectl::KubectlInstance,
    local_net::PathProvider,
    util::get_github_root,
    ClientWrapper, LineraNet, LineraNetConfig, Network, OnClientDrop,
};

#[derive(Clone, clap::Parser, clap::ValueEnum, Debug, Default)]
pub enum BuildMode {
    Debug,
    #[default]
    Release,
}

impl std::str::FromStr for BuildMode {
    type Err = String;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        clap::ValueEnum::from_str(s, true)
    }
}

impl std::fmt::Display for BuildMode {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "{:?}", self)
    }
}

/// The information needed to start a [`LocalKubernetesNet`].
pub struct LocalKubernetesNetConfig {
    pub network: Network,
    pub testing_prng_seed: Option<u64>,
    pub num_other_initial_chains: u32,
    pub initial_amount: Amount,
    pub num_initial_validators: usize,
    pub num_proxies: usize,
    pub num_shards: usize,
    pub binaries: BuildArg,
    pub no_build: bool,
    pub docker_image_name: String,
    pub build_mode: BuildMode,
    pub policy_config: ResourceControlPolicyConfig,
    pub num_block_exporters: usize,
    pub indexer_image_name: String,
    pub explorer_image_name: String,
    pub dual_store: bool,
    pub path_provider: PathProvider,
}

/// A set of Linera validators running locally as native processes.
#[derive(Clone)]
pub struct LocalKubernetesNet {
    network: Network,
    testing_prng_seed: Option<u64>,
    next_client_id: usize,
    binaries: BuildArg,
    no_build: bool,
    docker_image_name: String,
    build_mode: BuildMode,
    kubectl_instance: Arc<Mutex<KubectlInstance>>,
    kind_clusters: Vec<KindCluster>,
    num_initial_validators: usize,
    num_proxies: usize,
    num_shards: usize,
    num_block_exporters: usize,
    indexer_image_name: String,
    explorer_image_name: String,
    dual_store: bool,
    path_provider: PathProvider,
}

#[async_trait]
impl LineraNetConfig for LocalKubernetesNetConfig {
    type Net = LocalKubernetesNet;

    async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
        ensure!(
            self.num_initial_validators > 0,
            "There should be at least one initial validator"
        );

        let clusters = future::join_all((0..self.num_initial_validators).map(|_| async {
            KindCluster::create()
                .await
                .expect("Creating kind cluster should not fail")
        }))
        .await;

        let mut net = LocalKubernetesNet::new(
            self.network,
            self.testing_prng_seed,
            self.binaries,
            self.no_build,
            self.docker_image_name,
            self.build_mode,
            KubectlInstance::new(Vec::new()),
            clusters,
            self.num_initial_validators,
            self.num_proxies,
            self.num_shards,
            self.num_block_exporters,
            self.indexer_image_name,
            self.explorer_image_name,
            self.dual_store,
            self.path_provider,
        )?;

        let client = net.make_client().await;
        net.generate_initial_validator_config().await.unwrap();
        client
            .create_genesis_config(
                self.num_other_initial_chains,
                self.initial_amount,
                self.policy_config,
                Some(vec!["localhost".to_owned()]),
            )
            .await
            .unwrap();
        net.run().await.unwrap();

        Ok((net, client))
    }
}

#[async_trait]
impl LineraNet for Arc<Mutex<LocalKubernetesNet>> {
    async fn ensure_is_running(&mut self) -> Result<()> {
        let self_clone = self.clone();
        let mut self_lock = self_clone.lock().await;

        self_lock.ensure_is_running().await
    }

    async fn make_client(&mut self) -> ClientWrapper {
        let self_clone = self.clone();
        let mut self_lock = self_clone.lock().await;

        self_lock.make_client().await
    }

    async fn terminate(&mut self) -> Result<()> {
        // Users are responsible for killing the clusters if they want to
        Ok(())
    }
}

#[async_trait]
impl LineraNet for LocalKubernetesNet {
    async fn ensure_is_running(&mut self) -> Result<()> {
        let client = Client::try_default().await?;
        let pods: Api<Pod> = Api::namespaced(client, "default");

        let list_params = ListParams::default().labels("app=proxy");
        for pod in pods.list(&list_params).await? {
            if let Some(status) = pod.status {
                if let Some(phase) = status.phase {
                    if phase != "Running" {
                        bail!(
                            "Validator {} is not Running",
                            pod.metadata
                                .name
                                .expect("Fetching pod name should not fail")
                        );
                    }
                }
            }
        }

        let list_params = ListParams::default().labels("app=shards");
        for pod in pods.list(&list_params).await? {
            if let Some(status) = pod.status {
                if let Some(phase) = status.phase {
                    if phase != "Running" {
                        bail!(
                            "Shard {} is not Running",
                            pod.metadata
                                .name
                                .expect("Fetching pod name should not fail")
                        );
                    }
                }
            }
        }

        Ok(())
    }

    async fn make_client(&mut self) -> ClientWrapper {
        let client = ClientWrapper::new(
            self.path_provider.clone(),
            self.network,
            self.testing_prng_seed,
            self.next_client_id,
            OnClientDrop::LeakChains,
        );
        if let Some(seed) = self.testing_prng_seed {
            self.testing_prng_seed = Some(seed + 1);
        }
        self.next_client_id += 1;
        client
    }

    async fn terminate(&mut self) -> Result<()> {
        let mut kubectl_instance = self.kubectl_instance.lock().await;
        let mut errors = Vec::new();
        for port_forward_child in &mut kubectl_instance.port_forward_children {
            if let Err(e) = port_forward_child.kill().await {
                errors.push(e.into());
            }
        }

        for kind_cluster in &mut self.kind_clusters {
            if let Err(e) = kind_cluster.delete().await {
                errors.push(e);
            }
        }

        if errors.is_empty() {
            Ok(())
        } else {
            let err_str = if errors.len() > 1 {
                "Multiple errors"
            } else {
                "One error"
            };

            Err(errors
                .into_iter()
                .fold(anyhow!("{err_str} occurred"), |acc, e: anyhow::Error| {
                    acc.context(e)
                }))
        }
    }
}

impl LocalKubernetesNet {
    #[expect(clippy::too_many_arguments)]
    fn new(
        network: Network,
        testing_prng_seed: Option<u64>,
        binaries: BuildArg,
        no_build: bool,
        docker_image_name: String,
        build_mode: BuildMode,
        kubectl_instance: KubectlInstance,
        kind_clusters: Vec<KindCluster>,
        num_initial_validators: usize,
        num_proxies: usize,
        num_shards: usize,
        num_block_exporters: usize,
        indexer_image_name: String,
        explorer_image_name: String,
        dual_store: bool,
        path_provider: PathProvider,
    ) -> Result<Self> {
        Ok(Self {
            network,
            testing_prng_seed,
            next_client_id: 0,
            binaries,
            no_build,
            docker_image_name,
            build_mode,
            kubectl_instance: Arc::new(Mutex::new(kubectl_instance)),
            kind_clusters,
            num_initial_validators,
            num_proxies,
            num_shards,
            num_block_exporters,
            indexer_image_name,
            explorer_image_name,
            dual_store,
            path_provider,
        })
    }

    async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
        let path = resolve_binary(name, env!("CARGO_PKG_NAME")).await?;
        let mut command = Command::new(path);
        command.current_dir(self.path_provider.path());
        Ok(command)
    }

    fn configuration_string(&self, validator_number: usize) -> Result<String> {
        let path = self
            .path_provider
            .path()
            .join(format!("validator_{validator_number}.toml"));
        let public_port = 19100 + validator_number;
        let private_port = 20100;
        let metrics_port = 21100;
        let protocol = self.network.toml();
        let host = self.network.localhost();
        let mut content = format!(
            r#"
                server_config_path = "server_{validator_number}.json"
                host = "{host}"
                port = {public_port}
                external_protocol = {protocol}
                internal_protocol = {protocol}

            "#
        );

        for proxy_id in 0..self.num_proxies {
            content.push_str(&format!(
                r#"
                    [[proxies]]
                    host = "proxy-{proxy_id}.proxy-internal.default.svc.cluster.local"
                    public_port = {public_port}
                    private_port = {private_port}
                    metrics_port = {metrics_port}
                "#
            ));
        }

        for shard_id in 0..self.num_shards {
            content.push_str(&format!(
                r#"

                [[shards]]
                host = "shards-{shard_id}.shards.default.svc.cluster.local"
                port = {public_port}
                metrics_port = {metrics_port}
                "#
            ));
        }

        if self.num_block_exporters > 0 {
            for exporter_num in 0..self.num_block_exporters {
                let block_exporter_port = DEFAULT_BLOCK_EXPORTER_PORT;
                let block_exporter_host =
                    format!("linera-block-exporter-{exporter_num}.linera-block-exporter");
                let config_content = format!(
                    r#"

                        [[block_exporters]]
                        host = "{block_exporter_host}"
                        port = {block_exporter_port}
                        "#
                );

                content.push_str(&config_content);
            }
        }

        fs_err::write(&path, content)?;
        path.into_os_string().into_string().map_err(|error| {
            anyhow!(
                "could not parse OS string into string: {}",
                error.to_string_lossy()
            )
        })
    }

    async fn generate_initial_validator_config(&mut self) -> Result<()> {
        let mut command = self.command_for_binary("linera-server").await?;
        command.arg("generate");
        if let Some(seed) = self.testing_prng_seed {
            command.arg("--testing-prng-seed").arg(seed.to_string());
            self.testing_prng_seed = Some(seed + 1);
        }
        command.arg("--validators");
        for validator_number in 0..self.num_initial_validators {
            command.arg(&self.configuration_string(validator_number)?);
        }
        command
            .args(["--committee", "committee.json"])
            .spawn_and_wait_for_stdout()
            .await?;
        Ok(())
    }

    async fn run(&mut self) -> Result<()> {
        let github_root = get_github_root().await?;
        // Build Docker images
        let (docker_image_name, indexer_image_name, explorer_image_name) = if self.no_build {
            (
                self.docker_image_name.clone(),
                self.indexer_image_name.clone(),
                self.explorer_image_name.clone(),
            )
        } else {
            let mut join_set = JoinSet::new();
            join_set.spawn(DockerImage::build(
                self.docker_image_name.clone(),
                self.binaries.clone(),
                github_root.clone(),
                self.build_mode.clone(),
                self.dual_store,
                Dockerfile::Main,
            ));
            if self.num_block_exporters > 0 {
                join_set.spawn(DockerImage::build(
                    self.indexer_image_name.clone(),
                    self.binaries.clone(),
                    github_root.clone(),
                    self.build_mode.clone(),
                    self.dual_store,
                    Dockerfile::Indexer,
                ));
                join_set.spawn(DockerImage::build(
                    self.explorer_image_name.clone(),
                    self.binaries.clone(),
                    github_root.clone(),
                    self.build_mode.clone(),
                    self.dual_store,
                    Dockerfile::Explorer,
                ));
            }

            join_set
                .join_all()
                .await
                .into_iter()
                .collect::<Result<Vec<_>>>()?;

            (
                self.docker_image_name.clone(),
                self.indexer_image_name.clone(),
                self.explorer_image_name.clone(),
            )
        };

        let base_dir = github_root
            .join("kubernetes")
            .join("linera-validator")
            .join("working");
        fs_err::copy(
            self.path_provider.path().join("genesis.json"),
            base_dir.join("genesis.json"),
        )?;

        let kubectl_instance_clone = self.kubectl_instance.clone();
        let path_provider_path_clone = self.path_provider.path().to_path_buf();
        let num_proxies = self.num_proxies;
        let num_shards = self.num_shards;

        let mut validators_initialization_futures = Vec::new();
        for (validator_number, kind_cluster) in self.kind_clusters.iter().cloned().enumerate() {
            let base_dir = base_dir.clone();
            let github_root = github_root.clone();

            let kubectl_instance = kubectl_instance_clone.clone();
            let path_provider_path = path_provider_path_clone.clone();

            let docker_image_name = docker_image_name.clone();
            let indexer_image_name = indexer_image_name.clone();
            let explorer_image_name = explorer_image_name.clone();
            let dual_store = self.dual_store;
            let num_block_exporters = self.num_block_exporters;
            let future = async move {
                let cluster_id = kind_cluster.id();
                kind_cluster.load_docker_image(&docker_image_name).await?;
                if num_block_exporters > 0 {
                    kind_cluster.load_docker_image(&indexer_image_name).await?;
                    kind_cluster.load_docker_image(&explorer_image_name).await?;
                }

                let server_config_filename = format!("server_{}.json", validator_number);
                fs_err::copy(
                    path_provider_path.join(&server_config_filename),
                    base_dir.join(&server_config_filename),
                )?;

                HelmFile::sync(
                    validator_number,
                    &github_root,
                    num_proxies,
                    num_shards,
                    cluster_id,
                    docker_image_name,
                    num_block_exporters > 0,
                    num_block_exporters,
                    indexer_image_name,
                    explorer_image_name,
                    dual_store,
                )
                .await?;

                let mut kubectl_instance = kubectl_instance.lock().await;
                let proxy_service = "svc/proxy";

                let local_port = 19100 + validator_number;
                kubectl_instance.port_forward(
                    proxy_service,
                    &format!("{local_port}:19100"),
                    cluster_id,
                )?;

                Result::<(), anyhow::Error>::Ok(())
            };

            validators_initialization_futures.push(future);
        }

        future::join_all(validators_initialization_futures)
            .await
            .into_iter()
            .collect()
    }
}
